# This file is part of cloud-init. See LICENSE file for license information.

import json
import os
from copy import copy, deepcopy

from cloudinit import helpers, settings, util
from cloudinit.net import eni, network_state
from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit.sources.helpers import openstack
from tests.unittests.helpers import CiTestCase, ExitStack, mock, populate_dir

PUBKEY = "ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n"
EC2_META = {
    "ami-id": "ami-00000001",
    "ami-launch-index": 0,
    "ami-manifest-path": "FIXME",
    "block-device-mapping": {
        "ami": "sda1",
        "ephemeral0": "sda2",
        "root": "/dev/sda1",
        "swap": "sda3",
    },
    "hostname": "sm-foo-test.novalocal",
    "instance-action": "none",
    "instance-id": "i-00000001",
    "instance-type": "m1.tiny",
    "local-hostname": "sm-foo-test.novalocal",
    "local-ipv4": None,
    "placement": {"availability-zone": "nova"},
    "public-hostname": "sm-foo-test.novalocal",
    "public-ipv4": "",
    "public-keys": {"0": {"openssh-key": PUBKEY}},
    "reservation-id": "r-iru5qm4m",
    "security-groups": ["default"],
}
USER_DATA = b"#!/bin/sh\necho This is user data\n"
VENDOR_DATA = {
    "magic": "",
}
VENDOR_DATA2 = {
    "static": "",
}
OSTACK_META = {
    "availability_zone": "nova",
    "files": [
        {"content_path": "/content/0000", "path": "/etc/foo.cfg"},
        {"content_path": "/content/0001", "path": "/etc/bar/bar.cfg"},
    ],
    "hostname": "sm-foo-test.novalocal",
    "meta": {"dsmode": "local", "my-meta": "my-value"},
    "name": "sm-foo-test",
    "public_keys": {"mykey": PUBKEY},
    "uuid": "b0fa911b-69d4-4476-bbe2-1c92bff6535c",
}

CONTENT_0 = b"This is contents of /etc/foo.cfg\n"
CONTENT_1 = b"# this is /etc/bar/bar.cfg\n"
NETWORK_DATA = {
    "services": [
        {"type": "dns", "address": "199.204.44.24"},
        {"type": "dns", "address": "199.204.47.54"},
    ],
    "links": [
        {
            "vif_id": "2ecc7709-b3f7-4448-9580-e1ec32d75bbd",
            "ethernet_mac_address": "fa:16:3e:69:b0:58",
            "type": "ovs",
            "mtu": None,
            "id": "tap2ecc7709-b3",
        },
        {
            "vif_id": "2f88d109-5b57-40e6-af32-2472df09dc33",
            "ethernet_mac_address": "fa:16:3e:d4:57:ad",
            "type": "ovs",
            "mtu": None,
            "id": "tap2f88d109-5b",
        },
        {
            "vif_id": "1a5382f8-04c5-4d75-ab98-d666c1ef52cc",
            "ethernet_mac_address": "fa:16:3e:05:30:fe",
            "type": "ovs",
            "mtu": None,
            "id": "tap1a5382f8-04",
            "name": "nic0",
        },
    ],
    "networks": [
        {
            "link": "tap2ecc7709-b3",
            "type": "ipv4_dhcp",
            "network_id": "6d6357ac-0f70-4afa-8bd7-c274cc4ea235",
            "id": "network0",
        },
        {
            "link": "tap2f88d109-5b",
            "type": "ipv4_dhcp",
            "network_id": "d227a9b3-6960-4d94-8976-ee5788b44f54",
            "id": "network1",
        },
        {
            "link": "tap1a5382f8-04",
            "type": "ipv4_dhcp",
            "network_id": "dab2ba57-cae2-4311-a5ed-010b263891f5",
            "id": "network2",
        },
    ],
}

NETWORK_DATA_2 = {
    "services": [
        {"type": "dns", "address": "1.1.1.191"},
        {"type": "dns", "address": "1.1.1.4"},
    ],
    "networks": [
        {
            "network_id": "d94bbe94-7abc-48d4-9c82-4628ea26164a",
            "type": "ipv4",
            "netmask": "255.255.255.248",
            "link": "eth0",
            "routes": [
                {
                    "netmask": "0.0.0.0",
                    "network": "0.0.0.0",
                    "gateway": "2.2.2.9",
                }
            ],
            "ip_address": "2.2.2.10",
            "id": "network0-ipv4",
        },
        {
            "network_id": "ca447c83-6409-499b-aaef-6ad1ae995348",
            "type": "ipv4",
            "netmask": "255.255.255.224",
            "link": "eth1",
            "routes": [],
            "ip_address": "3.3.3.24",
            "id": "network1-ipv4",
        },
    ],
    "links": [
        {
            "ethernet_mac_address": "fa:16:3e:dd:50:9a",
            "mtu": 1500,
            "type": "vif",
            "id": "eth0",
            "vif_id": "vif-foo1",
        },
        {
            "ethernet_mac_address": "fa:16:3e:a8:14:69",
            "mtu": 1500,
            "type": "vif",
            "id": "eth1",
            "vif_id": "vif-foo2",
        },
    ],
}

# This network data ha 'tap' or null type for a link.
NETWORK_DATA_3 = {
    "services": [
        {"type": "dns", "address": "172.16.36.11"},
        {"type": "dns", "address": "172.16.36.12"},
    ],
    "networks": [
        {
            "network_id": "7c41450c-ba44-401a-9ab1-1604bb2da51e",
            "type": "ipv4",
            "netmask": "255.255.255.128",
            "link": "tap77a0dc5b-72",
            "ip_address": "172.17.48.18",
            "id": "network0",
            "routes": [
                {
                    "netmask": "0.0.0.0",
                    "network": "0.0.0.0",
                    "gateway": "172.17.48.1",
                }
            ],
        },
        {
            "network_id": "7c41450c-ba44-401a-9ab1-1604bb2da51e",
            "type": "ipv6",
            "netmask": "ffff:ffff:ffff:ffff::",
            "link": "tap77a0dc5b-72",
            "ip_address": "fdb8:52d0:9d14:0:f816:3eff:fe9f:70d",
            "id": "network1",
            "routes": [
                {
                    "netmask": "::",
                    "network": "::",
                    "gateway": "fdb8:52d0:9d14::1",
                }
            ],
        },
        {
            "network_id": "1f53cb0e-72d3-47c7-94b9-ff4397c5fe54",
            "type": "ipv4",
            "netmask": "255.255.255.128",
            "link": "tap7d6b7bec-93",
            "ip_address": "172.16.48.13",
            "id": "network2",
            "routes": [
                {
                    "netmask": "0.0.0.0",
                    "network": "0.0.0.0",
                    "gateway": "172.16.48.1",
                },
                {
                    "netmask": "255.255.0.0",
                    "network": "172.16.0.0",
                    "gateway": "172.16.48.1",
                },
            ],
        },
    ],
    "links": [
        {
            "ethernet_mac_address": "fa:16:3e:dd:50:9a",
            "mtu": None,
            "type": "tap",
            "id": "tap77a0dc5b-72",
            "vif_id": "77a0dc5b-720e-41b7-bfa7-1b2ff62e0d48",
        },
        {
            "ethernet_mac_address": "fa:16:3e:a8:14:69",
            "mtu": None,
            "type": None,
            "id": "tap7d6b7bec-93",
            "vif_id": "7d6b7bec-93e6-4c03-869a-ddc5014892d5",
        },
    ],
}

BOND_MAC = "fa:16:3e:b3:72:36"
NETWORK_DATA_BOND = {
    "services": [
        {"type": "dns", "address": "1.1.1.191"},
        {"type": "dns", "address": "1.1.1.4"},
    ],
    "networks": [
        {
            "id": "network2-ipv4",
            "ip_address": "2.2.2.13",
            "link": "vlan2",
            "netmask": "255.255.255.248",
            "network_id": "4daf5ce8-38cf-4240-9f1a-04e86d7c6117",
            "type": "ipv4",
            "routes": [
                {
                    "netmask": "0.0.0.0",
                    "network": "0.0.0.0",
                    "gateway": "2.2.2.9",
                }
            ],
        },
        {
            "id": "network3-ipv4",
            "ip_address": "10.0.1.5",
            "link": "vlan3",
            "netmask": "255.255.255.248",
            "network_id": "a9e2f47c-3c43-4782-94d0-e1eeef1c8c9d",
            "type": "ipv4",
            "routes": [
                {
                    "netmask": "255.255.255.255",
                    "network": "192.168.1.0",
                    "gateway": "10.0.1.1",
                }
            ],
        },
    ],
    "links": [
        {
            "ethernet_mac_address": "0c:c4:7a:34:6e:3c",
            "id": "eth0",
            "mtu": 1500,
            "type": "phy",
        },
        {
            "ethernet_mac_address": "0c:c4:7a:34:6e:3d",
            "id": "eth1",
            "mtu": 1500,
            "type": "phy",
        },
        {
            "bond_links": ["eth0", "eth1"],
            "bond_miimon": 100,
            "bond_mode": "4",
            "bond_xmit_hash_policy": "layer3+4",
            "ethernet_mac_address": BOND_MAC,
            "id": "bond0",
            "type": "bond",
        },
        {
            "ethernet_mac_address": "fa:16:3e:b3:72:30",
            "id": "vlan2",
            "type": "vlan",
            "vlan_id": 602,
            "vlan_link": "bond0",
            "vlan_mac_address": "fa:16:3e:b3:72:30",
        },
        {
            "ethernet_mac_address": "fa:16:3e:66:ab:a6",
            "id": "vlan3",
            "type": "vlan",
            "vlan_id": 612,
            "vlan_link": "bond0",
            "vlan_mac_address": "fa:16:3e:66:ab:a6",
        },
    ],
}

NETWORK_DATA_VLAN = {
    "services": [{"type": "dns", "address": "1.1.1.191"}],
    "networks": [
        {
            "id": "network1-ipv4",
            "ip_address": "10.0.1.5",
            "link": "vlan1",
            "netmask": "255.255.255.248",
            "network_id": "a9e2f47c-3c43-4782-94d0-e1eeef1c8c9d",
            "type": "ipv4",
            "routes": [
                {
                    "netmask": "255.255.255.255",
                    "network": "192.168.1.0",
                    "gateway": "10.0.1.1",
                }
            ],
        }
    ],
    "links": [
        {
            "ethernet_mac_address": "fa:16:3e:69:b0:58",
            "id": "eth0",
            "mtu": 1500,
            "type": "phy",
        },
        {
            "ethernet_mac_address": "fa:16:3e:b3:72:30",
            "id": "vlan1",
            "type": "vlan",
            "vlan_id": 602,
            "vlan_link": "eth0",
            "vlan_mac_address": "fa:16:3e:b3:72:30",
        },
    ],
}

KNOWN_MACS = {
    "fa:16:3e:69:b0:58": "enp0s1",
    "fa:16:3e:d4:57:ad": "enp0s2",
    "fa:16:3e:dd:50:9a": "foo1",
    "fa:16:3e:a8:14:69": "foo2",
    "fa:16:3e:ed:9a:59": "foo3",
    "0c:c4:7a:34:6e:3d": "oeth1",
    "0c:c4:7a:34:6e:3c": "oeth0",
}

CFG_DRIVE_FILES_V2 = {
    "ec2/2009-04-04/meta-data.json": json.dumps(EC2_META),
    "ec2/2009-04-04/user-data": USER_DATA,
    "ec2/latest/meta-data.json": json.dumps(EC2_META),
    "ec2/latest/user-data": USER_DATA,
    "openstack/2012-08-10/meta_data.json": json.dumps(OSTACK_META),
    "openstack/2012-08-10/user_data": USER_DATA,
    "openstack/content/0000": CONTENT_0,
    "openstack/content/0001": CONTENT_1,
    "openstack/latest/meta_data.json": json.dumps(OSTACK_META),
    "openstack/latest/user_data": USER_DATA,
    "openstack/latest/vendor_data.json": json.dumps(VENDOR_DATA),
    "openstack/latest/vendor_data2.json": json.dumps(VENDOR_DATA2),
    "openstack/latest/network_data.json": json.dumps(NETWORK_DATA),
    "openstack/2015-10-15/meta_data.json": json.dumps(OSTACK_META),
    "openstack/2015-10-15/user_data": USER_DATA,
    "openstack/2015-10-15/network_data.json": json.dumps(NETWORK_DATA),
    "openstack/2015-10-15/vendor_data.json": json.dumps(VENDOR_DATA),
    "openstack/2015-10-15/vendor_data2.json": json.dumps(VENDOR_DATA2),
}

M_PATH = "cloudinit.sources.DataSourceConfigDrive."


class TestConfigDriveDataSource(CiTestCase):
    def setUp(self):
        super(TestConfigDriveDataSource, self).setUp()
        self.add_patch(
            M_PATH + "util.find_devs_with", "m_find_devs_with", return_value=[]
        )
        self.tmp = self.tmp_dir()

    def test_ec2_metadata(self):
        populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
        found = ds.read_config_drive(self.tmp)
        self.assertTrue("ec2-metadata" in found)
        ec2_md = found["ec2-metadata"]
        self.assertEqual(EC2_META, ec2_md)

    def test_dev_os_remap(self):
        populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
        cfg_ds = ds.DataSourceConfigDrive(
            settings.CFG_BUILTIN, None, helpers.Paths({})
        )
        found = ds.read_config_drive(self.tmp)
        cfg_ds.metadata = found["metadata"]
        name_tests = {
            "ami": "/dev/vda1",
            "root": "/dev/vda1",
            "ephemeral0": "/dev/vda2",
            "swap": "/dev/vda3",
        }
        for name, dev_name in name_tests.items():
            with ExitStack() as mocks:
                provided_name = dev_name[len("/dev/") :]
                provided_name = "s" + provided_name[1:]
                find_mock = mocks.enter_context(
                    mock.patch.object(
                        util, "find_devs_with", return_value=[provided_name]
                    )
                )
                # We want os.path.exists() to return False on its first call,
                # and True on its second call.  We use a handy generator as
                # the mock side effect for this.  The mocked function returns
                # what the side effect returns.

                def exists_side_effect():
                    yield False
                    yield True

                exists_mock = mocks.enter_context(
                    mock.patch.object(
                        os.path, "exists", side_effect=exists_side_effect()
                    )
                )
                self.assertEqual(dev_name, cfg_ds.device_name_to_device(name))

                find_mock.assert_called_once_with(mock.ANY)
                self.assertEqual(exists_mock.call_count, 2)

    def test_dev_os_map(self):
        populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
        cfg_ds = ds.DataSourceConfigDrive(
            settings.CFG_BUILTIN, None, helpers.Paths({})
        )
        found = ds.read_config_drive(self.tmp)
        os_md = found["metadata"]
        cfg_ds.metadata = os_md
        name_tests = {
            "ami": "/dev/vda1",
            "root": "/dev/vda1",
            "ephemeral0": "/dev/vda2",
            "swap": "/dev/vda3",
        }
        for name, dev_name in name_tests.items():
            with ExitStack() as mocks:
                find_mock = mocks.enter_context(
                    mock.patch.object(
                        util, "find_devs_with", return_value=[dev_name]
                    )
                )
                exists_mock = mocks.enter_context(
                    mock.patch.object(os.path, "exists", return_value=True)
                )
                self.assertEqual(dev_name, cfg_ds.device_name_to_device(name))

                find_mock.assert_called_once_with(mock.ANY)
                exists_mock.assert_called_once_with(mock.ANY)

    def test_dev_ec2_remap(self):
        populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
        cfg_ds = ds.DataSourceConfigDrive(
            settings.CFG_BUILTIN, None, helpers.Paths({})
        )
        found = ds.read_config_drive(self.tmp)
        ec2_md = found["ec2-metadata"]
        os_md = found["metadata"]
        cfg_ds.ec2_metadata = ec2_md
        cfg_ds.metadata = os_md
        name_tests = {
            "ami": "/dev/vda1",
            "root": "/dev/vda1",
            "ephemeral0": "/dev/vda2",
            "swap": "/dev/vda3",
            None: None,
            "bob": None,
            "root2k": None,
        }
        for name, dev_name in name_tests.items():
            # We want os.path.exists() to return False on its first call,
            # and True on its second call.  We use a handy generator as
            # the mock side effect for this.  The mocked function returns
            # what the side effect returns.
            def exists_side_effect():
                yield False
                yield True

            with mock.patch.object(
                os.path, "exists", side_effect=exists_side_effect()
            ):
                self.assertEqual(dev_name, cfg_ds.device_name_to_device(name))
                # We don't assert the call count for os.path.exists() because
                # not all of the entries in name_tests results in two calls to
                # that function.  Specifically, 'root2k' doesn't seem to call
                # it at all.

    def test_dev_ec2_map(self):
        populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
        cfg_ds = ds.DataSourceConfigDrive(
            settings.CFG_BUILTIN, None, helpers.Paths({})
        )
        found = ds.read_config_drive(self.tmp)
        ec2_md = found["ec2-metadata"]
        os_md = found["metadata"]
        cfg_ds.ec2_metadata = ec2_md
        cfg_ds.metadata = os_md
        name_tests = {
            "ami": "/dev/sda1",
            "root": "/dev/sda1",
            "ephemeral0": "/dev/sda2",
            "swap": "/dev/sda3",
            None: None,
            "bob": None,
            "root2k": None,
        }
        for name, dev_name in name_tests.items():
            with mock.patch.object(os.path, "exists", return_value=True):
                self.assertEqual(dev_name, cfg_ds.device_name_to_device(name))

    def test_dir_valid(self):
        """Verify a dir is read as such."""

        populate_dir(self.tmp, CFG_DRIVE_FILES_V2)

        found = ds.read_config_drive(self.tmp)

        expected_md = copy(OSTACK_META)
        expected_md["instance-id"] = expected_md["uuid"]
        expected_md["local-hostname"] = expected_md["hostname"]

        self.assertEqual(USER_DATA, found["userdata"])
        self.assertEqual(expected_md, found["metadata"])
        self.assertEqual(NETWORK_DATA, found["networkdata"])
        self.assertEqual(VENDOR_DATA, found["vendordata"])
        self.assertEqual(VENDOR_DATA2, found["vendordata2"])
        self.assertEqual(found["files"]["/etc/foo.cfg"], CONTENT_0)
        self.assertEqual(found["files"]["/etc/bar/bar.cfg"], CONTENT_1)

    def test_seed_dir_valid_extra(self):
        """Verify extra files do not affect datasource validity."""

        data = copy(CFG_DRIVE_FILES_V2)
        data["myfoofile.txt"] = "myfoocontent"
        data["openstack/latest/random-file.txt"] = "random-content"

        populate_dir(self.tmp, data)

        found = ds.read_config_drive(self.tmp)

        expected_md = copy(OSTACK_META)
        expected_md["instance-id"] = expected_md["uuid"]
        expected_md["local-hostname"] = expected_md["hostname"]

        self.assertEqual(expected_md, found["metadata"])

    def test_seed_dir_bad_json_metadata(self):
        """Verify that bad json in metadata raises BrokenConfigDriveDir."""
        data = copy(CFG_DRIVE_FILES_V2)

        data["openstack/2012-08-10/meta_data.json"] = "non-json garbage {}"
        data["openstack/2015-10-15/meta_data.json"] = "non-json garbage {}"
        data["openstack/latest/meta_data.json"] = "non-json garbage {}"

        populate_dir(self.tmp, data)

        self.assertRaises(
            openstack.BrokenMetadata, ds.read_config_drive, self.tmp
        )

    def test_seed_dir_no_configdrive(self):
        """Verify that no metadata raises NonConfigDriveDir."""

        my_d = os.path.join(self.tmp, "non-configdrive")
        data = copy(CFG_DRIVE_FILES_V2)
        data["myfoofile.txt"] = "myfoocontent"
        data["openstack/latest/random-file.txt"] = "random-content"
        data["content/foo"] = "foocontent"

        self.assertRaises(openstack.NonReadable, ds.read_config_drive, my_d)

    def test_seed_dir_missing(self):
        """Verify that missing seed_dir raises NonConfigDriveDir."""
        my_d = os.path.join(self.tmp, "nonexistantdirectory")
        self.assertRaises(openstack.NonReadable, ds.read_config_drive, my_d)

    def test_find_candidates(self):
        devs_with_answers = {}

        def my_devs_with(*args, **kwargs):
            criteria = args[0] if len(args) else kwargs.pop("criteria", None)
            return devs_with_answers.get(criteria, [])

        def my_is_partition(dev):
            return dev[-1] in "0123456789" and not dev.startswith("sr")

        orig_find_devs_with = util.find_devs_with
        orig_is_partition = util.is_partition
        try:
            util.find_devs_with = my_devs_with
            util.is_partition = my_is_partition

            devs_with_answers = {
                "TYPE=vfat": [],
                "TYPE=iso9660": ["/dev/vdb"],
                "LABEL=config-2": ["/dev/vdb"],
            }
            self.assertEqual(["/dev/vdb"], ds.find_candidate_devs())

            # add a vfat item
            # zdd reverse sorts after vdb, but config-2 label is preferred
            devs_with_answers["TYPE=vfat"] = ["/dev/zdd"]
            self.assertEqual(
                ["/dev/vdb", "/dev/zdd"], ds.find_candidate_devs()
            )

            # verify that partitions are considered, that have correct label.
            devs_with_answers = {
                "TYPE=vfat": ["/dev/sda1"],
                "TYPE=iso9660": [],
                "LABEL=config-2": ["/dev/vdb3"],
            }
            self.assertEqual(["/dev/vdb3"], ds.find_candidate_devs())

            # Verify that uppercase labels are also found.
            devs_with_answers = {
                "TYPE=vfat": [],
                "TYPE=iso9660": ["/dev/vdb"],
                "LABEL=CONFIG-2": ["/dev/vdb"],
            }
            self.assertEqual(["/dev/vdb"], ds.find_candidate_devs())

        finally:
            util.find_devs_with = orig_find_devs_with
            util.is_partition = orig_is_partition

    @mock.patch(M_PATH + "on_first_boot")
    def test_pubkeys_v2(self, on_first_boot):
        """Verify that public-keys work in config-drive-v2."""
        myds = cfg_ds_from_dir(self.tmp, files=CFG_DRIVE_FILES_V2)
        self.assertEqual(
            myds.get_public_ssh_keys(), [OSTACK_META["public_keys"]["mykey"]]
        )
        self.assertEqual("configdrive", myds.cloud_name)
        self.assertEqual("openstack", myds.platform)
        self.assertEqual("seed-dir (%s/seed)" % self.tmp, myds.subplatform)

    def test_subplatform_config_drive_when_starts_with_dev(self):
        """subplatform reports config-drive when source starts with /dev/."""
        cfg_ds = ds.DataSourceConfigDrive(
            settings.CFG_BUILTIN, None, helpers.Paths({})
        )
        with mock.patch(M_PATH + "find_candidate_devs") as m_find_devs:
            with mock.patch(M_PATH + "util.mount_cb"):
                with mock.patch(M_PATH + "on_first_boot"):
                    m_find_devs.return_value = ["/dev/anything"]
                    self.assertEqual(True, cfg_ds.get_data())
        self.assertEqual("config-disk (/dev/anything)", cfg_ds.subplatform)


@mock.patch(
    "cloudinit.net.is_openvswitch_internal_interface",
    mock.Mock(return_value=False),
)
class TestNetJson(CiTestCase):
    def setUp(self):
        super(TestNetJson, self).setUp()
        self.tmp = self.tmp_dir()
        self.maxDiff = None

    @mock.patch(M_PATH + "on_first_boot")
    def test_network_data_is_found(self, on_first_boot):
        """Verify that network_data is present in ds in config-drive-v2."""
        myds = cfg_ds_from_dir(self.tmp, files=CFG_DRIVE_FILES_V2)
        self.assertIsNotNone(myds.network_json)

    @mock.patch(M_PATH + "on_first_boot")
    def test_network_config_is_converted(self, on_first_boot):
        """Verify that network_data is converted and present on ds object."""
        myds = cfg_ds_from_dir(self.tmp, files=CFG_DRIVE_FILES_V2)
        network_config = openstack.convert_net_json(
            NETWORK_DATA, known_macs=KNOWN_MACS
        )
        self.assertEqual(myds.network_config, network_config)

    def test_network_config_conversion_dhcp6(self):
        """Test some ipv6 input network json and check the expected
        conversions."""
        in_data = {
            "links": [
                {
                    "vif_id": "2ecc7709-b3f7-4448-9580-e1ec32d75bbd",
                    "ethernet_mac_address": "fa:16:3e:69:b0:58",
                    "type": "ovs",
                    "mtu": None,
                    "id": "tap2ecc7709-b3",
                },
                {
                    "vif_id": "2f88d109-5b57-40e6-af32-2472df09dc33",
                    "ethernet_mac_address": "fa:16:3e:d4:57:ad",
                    "type": "ovs",
                    "mtu": None,
                    "id": "tap2f88d109-5b",
                },
            ],
            "networks": [
                {
                    "link": "tap2ecc7709-b3",
                    "type": "ipv6_dhcpv6-stateless",
                    "network_id": "6d6357ac-0f70-4afa-8bd7-c274cc4ea235",
                    "id": "network0",
                },
                {
                    "link": "tap2f88d109-5b",
                    "type": "ipv6_dhcpv6-stateful",
                    "network_id": "d227a9b3-6960-4d94-8976-ee5788b44f54",
                    "id": "network1",
                },
            ],
        }
        out_data = {
            "version": 1,
            "config": [
                {
                    "mac_address": "fa:16:3e:69:b0:58",
                    "mtu": None,
                    "name": "enp0s1",
                    "subnets": [{"type": "ipv6_dhcpv6-stateless"}],
                    "type": "physical",
                },
                {
                    "mac_address": "fa:16:3e:d4:57:ad",
                    "mtu": None,
                    "name": "enp0s2",
                    "subnets": [{"type": "ipv6_dhcpv6-stateful"}],
                    "type": "physical",
                    "accept-ra": True,
                },
            ],
        }
        conv_data = openstack.convert_net_json(in_data, known_macs=KNOWN_MACS)
        self.assertEqual(out_data, conv_data)

    def test_network_config_conversions(self):
        """Tests a bunch of input network json and checks the
        expected conversions."""
        in_datas = [
            NETWORK_DATA,
            {
                "services": [{"type": "dns", "address": "172.19.0.12"}],
                "networks": [
                    {
                        "network_id": "dacd568d-5be6-4786-91fe-750c374b78b4",
                        "type": "ipv4",
                        "netmask": "255.255.252.0",
                        "link": "tap1a81968a-79",
                        "routes": [
                            {
                                "netmask": "0.0.0.0",
                                "network": "0.0.0.0",
                                "gateway": "172.19.3.254",
                            }
                        ],
                        "ip_address": "172.19.1.34",
                        "id": "network0",
                    }
                ],
                "links": [
                    {
                        "type": "bridge",
                        "vif_id": "1a81968a-797a-400f-8a80-567f997eb93f",
                        "ethernet_mac_address": "fa:16:3e:ed:9a:59",
                        "id": "tap1a81968a-79",
                        "mtu": None,
                    }
                ],
            },
        ]
        out_datas = [
            {
                "version": 1,
                "config": [
                    {
                        "subnets": [{"type": "dhcp4"}],
                        "type": "physical",
                        "mac_address": "fa:16:3e:69:b0:58",
                        "name": "enp0s1",
                        "mtu": None,
                    },
                    {
                        "subnets": [{"type": "dhcp4"}],
                        "type": "physical",
                        "mac_address": "fa:16:3e:d4:57:ad",
                        "name": "enp0s2",
                        "mtu": None,
                    },
                    {
                        "subnets": [{"type": "dhcp4"}],
                        "type": "physical",
                        "mac_address": "fa:16:3e:05:30:fe",
                        "name": "nic0",
                        "mtu": None,
                    },
                    {
                        "type": "nameserver",
                        "address": "199.204.44.24",
                    },
                    {
                        "type": "nameserver",
                        "address": "199.204.47.54",
                    },
                ],
            },
            {
                "version": 1,
                "config": [
                    {
                        "name": "foo3",
                        "mac_address": "fa:16:3e:ed:9a:59",
                        "mtu": None,
                        "type": "physical",
                        "subnets": [
                            {
                                "address": "172.19.1.34",
                                "netmask": "255.255.252.0",
                                "type": "static",
                                "ipv4": True,
                                "routes": [
                                    {
                                        "gateway": "172.19.3.254",
                                        "netmask": "0.0.0.0",
                                        "network": "0.0.0.0",
                                    }
                                ],
                            }
                        ],
                    },
                    {
                        "type": "nameserver",
                        "address": "172.19.0.12",
                    },
                ],
            },
        ]
        for in_data, out_data in zip(in_datas, out_datas):
            conv_data = openstack.convert_net_json(
                in_data, known_macs=KNOWN_MACS
            )
            self.assertEqual(out_data, conv_data)


@mock.patch(
    "cloudinit.net.is_openvswitch_internal_interface",
    mock.Mock(return_value=False),
)
class TestConvertNetworkData(CiTestCase):

    with_logs = True

    def setUp(self):
        super(TestConvertNetworkData, self).setUp()
        self.tmp = self.tmp_dir()

    def _getnames_in_config(self, ncfg):
        return set(
            [n["name"] for n in ncfg["config"] if n["type"] == "physical"]
        )

    def test_conversion_fills_names(self):
        ncfg = openstack.convert_net_json(NETWORK_DATA, known_macs=KNOWN_MACS)
        expected = set(["nic0", "enp0s1", "enp0s2"])
        found = self._getnames_in_config(ncfg)
        self.assertEqual(found, expected)

    @mock.patch("cloudinit.net.get_interfaces_by_mac")
    def test_convert_reads_system_prefers_name(self, get_interfaces_by_mac):
        macs = KNOWN_MACS.copy()
        macs.update(
            {"fa:16:3e:05:30:fe": "foonic1", "fa:16:3e:69:b0:58": "ens1"}
        )
        get_interfaces_by_mac.return_value = macs

        ncfg = openstack.convert_net_json(NETWORK_DATA)
        expected = set(["nic0", "ens1", "enp0s2"])
        found = self._getnames_in_config(ncfg)
        self.assertEqual(found, expected)

    def test_convert_raises_value_error_on_missing_name(self):
        macs = {"aa:aa:aa:aa:aa:00": "ens1"}
        self.assertRaises(
            ValueError,
            openstack.convert_net_json,
            NETWORK_DATA,
            known_macs=macs,
        )

    def test_conversion_with_route(self):
        ncfg = openstack.convert_net_json(
            NETWORK_DATA_2, known_macs=KNOWN_MACS
        )
        # not the best test, but see that we get a route in the
        # network config and that it gets rendered to an ENI file
        routes = []
        for n in ncfg["config"]:
            for s in n.get("subnets", []):
                routes.extend(s.get("routes", []))
        self.assertIn(
            {"network": "0.0.0.0", "netmask": "0.0.0.0", "gateway": "2.2.2.9"},
            routes,
        )
        eni_renderer = eni.Renderer()
        eni_renderer.render_network_state(
            network_state.parse_net_config_data(ncfg), target=self.tmp
        )
        with open(
            os.path.join(self.tmp, "etc", "network", "interfaces"), "r"
        ) as f:
            eni_rendering = f.read()
            self.assertIn("route add default gw 2.2.2.9", eni_rendering)

    def test_conversion_with_tap(self):
        ncfg = openstack.convert_net_json(
            NETWORK_DATA_3, known_macs=KNOWN_MACS
        )
        physicals = set()
        for i in ncfg["config"]:
            if i.get("type") == "physical":
                physicals.add(i["name"])
        self.assertEqual(physicals, set(("foo1", "foo2")))

    def test_bond_conversion(self):
        # light testing of bond conversion and eni rendering of bond
        ncfg = openstack.convert_net_json(
            NETWORK_DATA_BOND, known_macs=KNOWN_MACS
        )
        eni_renderer = eni.Renderer()

        eni_renderer.render_network_state(
            network_state.parse_net_config_data(ncfg), target=self.tmp
        )
        with open(
            os.path.join(self.tmp, "etc", "network", "interfaces"), "r"
        ) as f:
            eni_rendering = f.read()

        # Verify there are expected interfaces in the net config.
        interfaces = sorted(
            [
                i["name"]
                for i in ncfg["config"]
                if i["type"] in ("vlan", "bond", "physical")
            ]
        )
        self.assertEqual(
            sorted(["oeth0", "oeth1", "bond0", "bond0.602", "bond0.612"]),
            interfaces,
        )

        words = eni_rendering.split()
        # 'eth0' and 'eth1' are the ids. because their mac adresses
        # map to other names, we should not see them in the ENI
        self.assertNotIn("eth0", words)
        self.assertNotIn("eth1", words)

        # oeth0 and oeth1 are the interface names for eni.
        # bond0 will be generated for the bond. Each should be auto.
        self.assertIn("auto oeth0", eni_rendering)
        self.assertIn("auto oeth1", eni_rendering)
        self.assertIn("auto bond0", eni_rendering)
        # The bond should have the given mac address
        pos = eni_rendering.find("auto bond0")
        self.assertIn(BOND_MAC, eni_rendering[pos:])

    def test_vlan(self):
        # light testing of vlan config conversion and eni rendering
        ncfg = openstack.convert_net_json(
            NETWORK_DATA_VLAN, known_macs=KNOWN_MACS
        )
        eni_renderer = eni.Renderer()
        eni_renderer.render_network_state(
            network_state.parse_net_config_data(ncfg), target=self.tmp
        )
        with open(
            os.path.join(self.tmp, "etc", "network", "interfaces"), "r"
        ) as f:
            eni_rendering = f.read()

        self.assertIn("iface enp0s1", eni_rendering)
        self.assertIn("address 10.0.1.5", eni_rendering)
        self.assertIn("auto enp0s1.602", eni_rendering)

    def test_mac_addrs_can_be_upper_case(self):
        # input mac addresses on rackspace may be upper case
        my_netdata = deepcopy(NETWORK_DATA)
        for link in my_netdata["links"]:
            link["ethernet_mac_address"] = link["ethernet_mac_address"].upper()

        ncfg = openstack.convert_net_json(my_netdata, known_macs=KNOWN_MACS)
        config_name2mac = {}
        for n in ncfg["config"]:
            if n["type"] == "physical":
                config_name2mac[n["name"]] = n["mac_address"]

        expected = {
            "nic0": "fa:16:3e:05:30:fe",
            "enp0s1": "fa:16:3e:69:b0:58",
            "enp0s2": "fa:16:3e:d4:57:ad",
        }
        self.assertEqual(expected, config_name2mac)

    def test_unknown_device_types_accepted(self):
        # If we don't recognise a link, we should treat it as physical for a
        # best-effort boot
        my_netdata = deepcopy(NETWORK_DATA)
        my_netdata["links"][0]["type"] = "my-special-link-type"

        ncfg = openstack.convert_net_json(my_netdata, known_macs=KNOWN_MACS)
        config_name2mac = {}
        for n in ncfg["config"]:
            if n["type"] == "physical":
                config_name2mac[n["name"]] = n["mac_address"]

        expected = {
            "nic0": "fa:16:3e:05:30:fe",
            "enp0s1": "fa:16:3e:69:b0:58",
            "enp0s2": "fa:16:3e:d4:57:ad",
        }
        self.assertEqual(expected, config_name2mac)

        # We should, however, warn the user that we don't recognise the type
        self.assertIn(
            "Unknown network_data link type (my-special-link-type)",
            self.logs.getvalue(),
        )


def cfg_ds_from_dir(base_d, files=None):
    run = os.path.join(base_d, "run")
    os.mkdir(run)
    cfg_ds = ds.DataSourceConfigDrive(
        settings.CFG_BUILTIN, None, helpers.Paths({"run_dir": run})
    )
    cfg_ds.seed_dir = os.path.join(base_d, "seed")
    if files:
        populate_dir(cfg_ds.seed_dir, files)
    cfg_ds.known_macs = KNOWN_MACS.copy()
    if not cfg_ds.get_data():
        raise RuntimeError(
            "Data source did not extract itself from seed directory %s"
            % cfg_ds.seed_dir
        )
    return cfg_ds


def populate_ds_from_read_config(cfg_ds, source, results):
    """Patch the DataSourceConfigDrive from the results of
    read_config_drive_dir hopefully in line with what it would have
    if cfg_ds.get_data had been successfully called"""
    cfg_ds.source = source
    cfg_ds.metadata = results.get("metadata")
    cfg_ds.ec2_metadata = results.get("ec2-metadata")
    cfg_ds.userdata_raw = results.get("userdata")
    cfg_ds.vendordata_raw = results.get("vendordata")
    cfg_ds.vendordata2_raw = results.get("vendordata2")
    cfg_ds.version = results.get("version")
    cfg_ds.network_json = results.get("networkdata")
    cfg_ds._network_config = openstack.convert_net_json(
        cfg_ds.network_json, known_macs=KNOWN_MACS
    )
